/**
 * Copyright 2019 LinkedIn Corp. All rights reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 */
package com.github.ambry.cloud;

import com.azure.cosmos.ConsistencyLevel;
import com.azure.cosmos.CosmosAsyncClient;
import com.azure.cosmos.CosmosAsyncContainer;
import com.azure.cosmos.CosmosClientBuilder;
import com.azure.cosmos.models.CosmosQueryRequestOptions;
import com.azure.cosmos.models.PartitionKey;
import com.azure.cosmos.models.SqlQuerySpec;
import com.github.ambry.cloud.azure.AzureCloudConfig;
import com.github.ambry.clustermap.PartitionId;
import com.github.ambry.commons.BlobId;
import com.github.ambry.store.MessageInfo;
import com.github.ambry.store.MockMessageWriteSet;
import com.github.ambry.utils.TestUtils;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import reactor.core.publisher.Flux;

import static com.github.ambry.commons.BlobId.*;


public class CloudTestUtil {

  /**
   * Utility method to add a BlobId and generated byte buffer to the specified MessageWriteSet.
   * @param messageWriteSet the {@link MockMessageWriteSet} in which to store the data.
   * @param blobId the blobId to add.
   * @param size the size of the byte buffer.
   * @param expiresAtMs the expiration time.
   * @param operationTime the operation time.
   * @param isVcr is the test running as vcr.
   */
  static void addBlobToMessageSet(MockMessageWriteSet messageWriteSet, BlobId blobId, long size, long expiresAtMs,
      long operationTime, boolean isVcr) {
    long crc = new Random().nextLong();
    MessageInfo info =
        new MessageInfo(blobId, size, false, true, false, expiresAtMs, crc, (short) 50, (short) 100, operationTime,
            initLifeVersion(isVcr));
    ByteBuffer buffer = ByteBuffer.wrap(TestUtils.getRandomBytes((int) size));
    messageWriteSet.add(info, buffer);
  }

  /**
   * Utility method to generate a BlobId and byte buffer for a blob with specified properties and add them to the specified MessageWriteSet.
   * @param messageWriteSet the {@link MockMessageWriteSet} in which to store the data.
   * @param size the size of the byte buffer.
   * @param expiresAtMs the expiration time.
   * @param accountId the account Id.
   * @param containerId the container Id.
   * @param encrypted the encrypted bit.
   * @param deleted true if blob is deleted.
   * @param partitionId the partition id.
   * @param operationTime the operation time.
   * @param isVcr flag to indicate if running as vcr.
   * @return the generated {@link BlobId}.
   */
  static BlobId addBlobToMessageSet(MockMessageWriteSet messageWriteSet, long size, long expiresAtMs, short accountId,
      short containerId, boolean encrypted, boolean deleted, PartitionId partitionId, long operationTime,
      boolean isVcr) {
    BlobId id = getUniqueId(accountId, containerId, encrypted, partitionId);
    long crc = new Random().nextLong();
    MessageInfo info =
        new MessageInfo(id, size, deleted, true, false, expiresAtMs, crc, accountId, containerId, operationTime,
            initLifeVersion(isVcr));
    ByteBuffer buffer = ByteBuffer.wrap(TestUtils.getRandomBytes((int) size));
    messageWriteSet.add(info, buffer);
    return id;
  }

  /**
   * Utility method to generate a BlobId and byte buffer for a blob with specified properties and add them to the specified MessageWriteSet.
   * @param messageWriteSet the {@link MockMessageWriteSet} in which to store the data.
   * @param expiresAtMs the expiration time.
   * @param accountId the account Id.
   * @param containerId the container Id.
   * @param partitionId the partition id.
   * @param operationTime the operation time.
   * @param lifeVersion the life version.
   */
  static void addBlobToMessageSet(MockMessageWriteSet messageWriteSet, long expiresAtMs, short accountId,
      short containerId, PartitionId partitionId, long operationTime, short lifeVersion) {
    long size = 100;
    BlobId id = getUniqueId(accountId, containerId, false, partitionId);
    long crc = new Random().nextLong();
    MessageInfo info =
        new MessageInfo(id, size, false, true, false, expiresAtMs, crc, accountId, containerId, operationTime,
            lifeVersion);
    ByteBuffer buffer = ByteBuffer.wrap(TestUtils.getRandomBytes((int) size));
    messageWriteSet.add(info, buffer);
  }

  /**
   * Utility method to generate a {@link BlobId} with specified account and container.
   * @param accountId the account Id.
   * @param containerId the container Id.
   * @param encrypted the encrypted bit.
   * @return the generated {@link BlobId}.
   */
  static BlobId getUniqueId(short accountId, short containerId, boolean encrypted, PartitionId partitionId) {
    byte dataCenterId = 66;
    return new BlobId(BLOB_ID_V6, BlobIdType.NATIVE, dataCenterId, accountId, containerId, partitionId, encrypted,
        BlobDataType.DATACHUNK);
  }

  /**
   * @return -1 if not vcr. 0 otherwise.
   */
  static short initLifeVersion(boolean isVcr) {
    return (short) (isVcr ? 0 : -1);
  }

  /**
   * Cleanup the specified partition in azure by deleting all the blobs of the partition.
   * @param azureCloudConfig Properties containing the credentials needed for connection to azure.
   * @param partitionId partition to be deleted.
   */
  static void cleanupPartition(AzureCloudConfig azureCloudConfig, PartitionId partitionId) {
    CosmosAsyncClient cosmosAsyncClient = new CosmosClientBuilder().endpoint(azureCloudConfig.cosmosEndpoint)
        .key(azureCloudConfig.cosmosKey)
        .consistencyLevel(ConsistencyLevel.SESSION)
        .buildAsyncClient();
    CosmosAsyncContainer cosmosAsyncContainer = cosmosAsyncClient.getDatabase(azureCloudConfig.cosmosDatabase)
        .getContainer(azureCloudConfig.cosmosCollection);

    List<CloudBlobMetadata> cloudBlobMetadataList = new ArrayList<>();

    // Query items
    SqlQuerySpec sqlQuerySpec =
        new SqlQuerySpec("select * from c where c.partitionId=\"" + partitionId.toPathString() + "\"");
    CosmosQueryRequestOptions cosmosQueryRequestOptions = new CosmosQueryRequestOptions();
    cosmosQueryRequestOptions.setPartitionKey(new PartitionKey(partitionId.toPathString()));

    cosmosAsyncContainer.queryItems(sqlQuerySpec, cosmosQueryRequestOptions, CloudBlobMetadata.class)
        .byPage()
        .flatMapSequential(feedResponse -> {
          cloudBlobMetadataList.addAll(feedResponse.getResults());
          return Flux.empty();
        })
        .blockLast();

    // Delete items
    cloudBlobMetadataList.forEach(cloudBlobMetadata -> cosmosAsyncContainer.deleteItem(cloudBlobMetadata.getId(),
        new PartitionKey(partitionId.toPathString())).block());
  }
}
